[MongoDB Replication] Raw Change Streams#591
Conversation
🦋 Changeset detectedLatest commit: a011930 The changes in this PR will be included in the next version bump. This PR includes changesets to release 12 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
Sleepful
left a comment
There was a problem hiding this comment.
The changes look good.
streamChangesInternal in particular is a really long function with a lot of edge cases. I suggested some places where it could be broken down a bit, if you find more places that could be extracted, I think that could help with reading through. It is a complex function though, and the complexity cannot be removed. I cannot validate that all the if statements for the various edge cases are correct, I think that's the main oversight of my review.
Other than that, I like the rawChangeStream change, using db.command instead of internal API of mongo driver.
I suggested some changes but not blocking on them. Also if you want to apply some of the suggestions, I figure it might be convenient to merge current PR and do a follow-up PR for any changes.
|
@Sleepful Thanks for the review, I implemented some of the cosmetic suggestions. (As mentioned in another comment) Note that for this specific PR, viewing the diff with "ignore whitespace changes" help a lot - a lot of the changes are in the diff simply because of the change in indentation. Regardless, that function is indeed long and difficult to follow. We can try to split that up in a future PR. |
|
Ohhh I keep forgetting about the whitespace, good reminder |
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit.
Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag Verified: 59/59 standard MongoDB tests pass. Cosmos DB tests blocked by cluster downtime (TLS connection timeout). Code audit of RawChangeStream.ts found no Cosmos DB compatibility issues — cursor ID type is automatically fixed by BigInt conversion, postBatchResumeToken needs empirical verification.
Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit. resolve: ChangeStream.ts merge conflicts for raw change streams Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit. resolve: ChangeStream.ts merge conflicts for raw change streams Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit. resolve: ChangeStream.ts merge conflicts for raw change streams Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit. resolve: ChangeStream.ts merge conflicts for raw change streams Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Supersedes #309.
This switches to using aggregate/getMore commands directly, instead of the built-in watch methods.
Motivations:
This removes hacks previously using private APIs from:
This is also the first step in writing a more efficient conversion for raw BSON -> serialized JSON. We can't do that with the driver's change stream implementation, since that doesn't support getting results as raw BSON.
TODO: